-
Notifications
You must be signed in to change notification settings - Fork 580
refactor!: Introduce PXE JobCoordinator #19445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thunkar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
| for (const store of this.#stores.values()) { | ||
| await store.commit(jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do Promise.all? Or do we have reason to believe that these writes cannot be made concurrenty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to the Promise.all, since stores are disjoint. Let me tackle that once the more critical path is done.
| * Checks if there's a job currently in progress. | ||
| */ | ||
| hasJobInProgress(): boolean { | ||
| return this.#currentJobId !== undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why receive the job ids in fns (commit and abort) and not expose it? Is it to ensure that the caller is properly tracking the job id they got from begin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought the API more around the possibility of having multiple concurrent jobs, so when/if that time comes #currentJobId would become #currentJobs: string[], and the internal checks would get relaxed, but the external API would look the same.
Not strictly necessary for it to be like that in its current incarnation, but also don't see anything bad with it.
Second part of the series started with #19445. This makes the CapsuleStore work based on staged writes. With this, capsules aren't written to persistent storage until PXE decides to commit the job.
Second part of the series started with #19445. This makes the CapsuleStore work based on staged writes. With this, capsules aren't written to persistent storage until PXE decides to commit the job.
Third part of the series started with #19445. This makes the stores related to tagging synchronization work based on staged writes.
Third part of the series started with #19445. This makes the stores related to tagging synchronization work based on staged writes.
Fourth part of the series started with #19445. This makes the PrivateEventStore work based on staged writes. With this, private events aren't written to persistent storage until PXE decides to commit the job.
Fourth part of the series started with #19445. This makes the PrivateEventStore work based on staged writes. With this, private events aren't written to persistent storage until PXE decides to commit the job.
I decided to fragment #19293 into a smaller, more digestible (both for reviewers and for myself) series of PRs.
The end goal is to refactor PXE's stores so they work with "staged writes": every write to a store is now kept in memory segmented by a
jobId, and is not written to the underlying KV store until a coordinated commit.Relevant stores will (in subsequent PRs) implement a new
StagedStoreinterface, which defines the following methods:commit(jobId): when called, moves all the in-data memory corresponding tojobIdto the persistent KV store.discardStaged(jobId): clears up any in-memory data structures associated tojobIdwithout persisting.Read operations can optionally receive a
jobId, which affects behavior as follows:jobId(how both sources of data are unified is store-dependent).A new
JobCoordinatorclass exposes the following methods for PXE's convenience:registerStores(stagedStores: StagedStore[]): makes a collection of stores known to theJobCoordinator.beginJob(): string: called by PXE when a job starts, returns ajobIdthat then gets threaded through the job's phases.commitJob(jobId): iterates over all registered stores, callingcommit(jobId)and wrapped by atransactionAsynccall to guarantee that all writes happen in the same KV transaction.abortJob(jobId): same ascommitJob, but callingdiscard.As a result, any data operations done before PXE decides to
commitJobare discarded if PXE fails, process is killed, etc.This specific PR introduces the JobCoordinator class, and makes PXE jobs use it, and threads
jobId's through ContractFunctionSimulator and the oracles from where they will be used as params to store operations.